/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.synthetic;



import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.ByteArrayCoder;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.KvCoder;

import com.bff.gaia.unified.sdk.io.OffsetBasedSource;

import com.bff.gaia.unified.sdk.io.synthetic.delay.ReaderDelay;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.IOException;

import java.util.List;

import java.util.NoSuchElementException;

import java.util.stream.Collectors;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * A {@link SyntheticBoundedSource} that reads {@code KV<byte[], byte[]>}.

 *

 * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of {@code KV<byte[],

 * byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} are associated with

 * "hot" keys, which are uniformly distributed over a fixed number of hot keys. The remaining

 * generated records are associated with "random" keys. Each record will be slowed down by a certain

 * sleep time generated based on the specified sleep time distribution when the {@link

 * SyntheticBoundedSource.SyntheticSourceReader} reads each record. The record {@code KV<byte[],

 * byte[]>} is generated deterministically based on the record's position in the source, which

 * enables repeatable execution for debugging. The SyntheticBoundedInput configurable parameters are

 * defined in {@link SyntheticSourceOptions}.*

 */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class SyntheticBoundedSource extends OffsetBasedSource<KV<byte[], byte[]>> {



  private static final long serialVersionUID = 0;



  private static final Logger LOG = LoggerFactory.getLogger(SyntheticBoundedSource.class);



  private final SyntheticSourceOptions sourceOptions;



  private final BundleSplitter bundleSplitter;



  public SyntheticBoundedSource(SyntheticSourceOptions sourceOptions) {

    this(0, sourceOptions.numRecords, sourceOptions);

  }



  public SyntheticBoundedSource(

      long startOffset, long endOffset, SyntheticSourceOptions sourceOptions) {

    super(startOffset, endOffset, 1);

    this.sourceOptions = sourceOptions;

    this.bundleSplitter = new BundleSplitter(this.sourceOptions);

    LOG.debug("Constructing {}", toString());

  }



  @Override

  public Coder<KV<byte[], byte[]>> getDefaultOutputCoder() {

    return KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of());

  }



  @Override

  // TODO: test cases where the source size could not be estimated (i.e., return 0).

  // TODO: test cases where the key size and value size might differ from record to record.

  // The key size and value size might have their own distributions.

  public long getBytesPerOffset() {

    return sourceOptions.bytesPerRecord >= 0

        ? sourceOptions.bytesPerRecord

        : sourceOptions.keySizeBytes + sourceOptions.valueSizeBytes;

  }



  @Override

  public void validate() {

    super.validate();

    sourceOptions.validate();

  }



  @Override

  public String toString() {

    return MoreObjects.toStringHelper(this)

        .add("options", sourceOptions)

        .add("offsetRange", "[" + getStartOffset() + ", " + getEndOffset() + ")")

        .toString();

  }



  @Override

  public final SyntheticBoundedSource createSourceForSubrange(long start, long end) {

    checkArgument(

        start >= getStartOffset(),

        "Start offset value "

            + start

            + " of the subrange cannot be smaller than the start offset value "

            + getStartOffset()

            + " of the parent source");

    checkArgument(

        end <= getEndOffset(),

        "End offset value "

            + end

            + " of the subrange cannot be larger than the end offset value "

            + getEndOffset()

            + " of the parent source");



    return new SyntheticBoundedSource(start, end, sourceOptions);

  }



  @Override

  public long getMaxEndOffset(PipelineOptions options) {

    return getEndOffset();

  }



  @Override

  public SyntheticSourceReader createReader(PipelineOptions pipelineOptions) {

    return new SyntheticSourceReader(this);

  }



  @Override

  public List<SyntheticBoundedSource> split(long desiredBundleSizeBytes, PipelineOptions options)

      throws Exception {

    // Choose number of bundles either based on explicit parameter,

    // or based on size and hints.

    int desiredNumBundles =

        (sourceOptions.forceNumInitialBundles == null)

            ? ((int) Math.ceil(1.0 * getEstimatedSizeBytes(options) / desiredBundleSizeBytes))

            : sourceOptions.forceNumInitialBundles;



    List<SyntheticBoundedSource> res =

        bundleSplitter.getBundleSizes(desiredNumBundles, this.getStartOffset(), this.getEndOffset())

            .stream()

            .map(offsetRange -> createSourceForSubrange(offsetRange.getFrom(), offsetRange.getTo()))

            .collect(Collectors.toList());

    LOG.info("Split into {} bundles of sizes: {}", res.size(), res);

    return res;

  }



  /**

   * A reader over the {@link PCollection} of {@code KV<byte[], byte[]>} from the synthetic source.

   *

   * <p>The random but deterministic record at position "i" in the range [A, B) is generated by

   * using {@link SyntheticSourceOptions#genRecord}. Reading each record sleeps according to the

   * sleep time distribution in {@code SyntheticOptions}.

   */

  private static class SyntheticSourceReader extends OffsetBasedReader<KV<byte[], byte[]>> {



    private final long splitPointFrequencyRecords;



    private KV<byte[], byte[]> currentKvPair;



    private long currentOffset;



    private boolean isAtSplitPoint;



    private ReaderDelay readerDelay;



    SyntheticSourceReader(SyntheticBoundedSource source) {

      super(source);

      this.readerDelay = new ReaderDelay(source.sourceOptions);

      this.currentKvPair = null;

      this.splitPointFrequencyRecords = source.sourceOptions.splitPointFrequencyRecords;

    }



    @Override

    public synchronized SyntheticBoundedSource getCurrentSource() {

      return (SyntheticBoundedSource) super.getCurrentSource();

    }



    @Override

    protected long getCurrentOffset() throws IllegalStateException {

      return currentOffset;

    }



    @Override

    public KV<byte[], byte[]> getCurrent() throws NoSuchElementException {

      if (currentKvPair == null) {

        throw new NoSuchElementException(

            "The current element is unavailable because either the reader is "

                + "at the beginning of the input and start() or advance() wasn't called, "

                + "or the last start() or advance() returned false.");

      }

      return currentKvPair;

    }



    @Override

    public boolean allowsDynamicSplitting() {

      return splitPointFrequencyRecords > 0;

    }



    @Override

    protected final boolean startImpl() throws IOException {

      this.currentOffset = getCurrentSource().getStartOffset();

      if (splitPointFrequencyRecords > 0) {

        while (currentOffset % splitPointFrequencyRecords != 0) {

          ++currentOffset;

        }

      }



      readerDelay.delayStart(currentOffset);

      isAtSplitPoint = true;

      --currentOffset;

      return advanceImpl();

    }



    @Override

    protected boolean advanceImpl() {

      currentOffset++;

      isAtSplitPoint = shouldSourceSplit();

      SyntheticSourceOptions.Record record = getCurrentSource().sourceOptions.genRecord(currentOffset);

      currentKvPair = record.kv;

      readerDelay.delayRecord(record);



      return true;

    }



    private boolean shouldSourceSplit() {

      return (splitPointFrequencyRecords == 0) || (currentOffset % splitPointFrequencyRecords == 0);

    }



    @Override

    public Double getFractionConsumed() {

      double realFractionConsumed = super.getFractionConsumed();

      SyntheticSourceOptions.ProgressShape shape = getCurrentSource().sourceOptions.progressShape;

      switch (shape) {

        case LINEAR:

          return realFractionConsumed;

        case LINEAR_REGRESSING:

          return 0.9 - 0.8 * realFractionConsumed;

        default:

          throw new AssertionError("Unexpected progress shape: " + shape);

      }

    }



    @Override

    protected boolean isAtSplitPoint() throws NoSuchElementException {

      return isAtSplitPoint;

    }



    @Override

    public void close() {

      // Nothing

    }

  }

}